feat: Expose prediction SSE streams#3019
Conversation
There was a problem hiding this comment.
I'm Bonk, and I've done a quick review of your PR.
This PR adds Server-Sent Events (SSE) streaming for predictions with event replay for late subscribers. The implementation looks solid overall with good test coverage. I found one potential resource concern that should be addressed:
Issues
1. Unbounded memory growth in stream_history (Medium)
The stream_history Vec in Prediction grows unbounded during prediction execution. For long-running predictions with frequent output chunks, logs, or metrics, this could cause memory exhaustion. Consider adding a limit to the history size or making it configurable.
|
I've completed my review of PR #3019. The PR adds SSE streaming for predictions with event replay functionality. The implementation is well-structured and includes comprehensive tests. Summary of findings: I identified one medium-severity issue regarding unbounded memory growth in the The rest of the implementation looks solid:
|
|
Based on my review of PR #3019, I agree with the previous reviewer's finding about unbounded memory growth in The implementation is well-structured:
LGTM |
cfa865e to
c61e323
Compare
|
Based on my thorough review of PR #3019, I have analyzed all the changed files: Summary of changes: Previous findings: My independent assessment:
No additional actionable issues beyond what was already flagged. LGTM |
|
After reviewing PR #3019, I've analyzed the changes to add Server-Sent Events (SSE) streaming for predictions. The implementation reuses Key aspects reviewed:
Previously flagged issue: No additional actionable issues found beyond what was already flagged. LGTM |
|
Based on my thorough review of PR #3019, I can see this PR adds Server-Sent Events (SSE) streaming for predictions by reusing Key changes reviewed:
Previously flagged issue (addressed): Additional verification:
LGTM |
|
Based on my review of PR #3019, I can confirm that the previously flagged issue regarding unbounded memory growth in Verified fixes:
Additional verification:
The implementation is well-structured with comprehensive test coverage including unit tests for lagged streams, truncated replay, and integration tests. LGTM |
michaeldwan
left a comment
There was a problem hiding this comment.
Solid design. Main point discussed offline: streaming needs to be opt-in at the handler level via a decorator/attribute. Five concrete changes in the comments.
|
|
||
| fn emit_stream_event(&mut self, event: PredictionStreamEvent) { | ||
| if self.stream_history.len() == MAX_STREAM_HISTORY_EVENTS { | ||
| self.stream_history.remove(0); |
There was a problem hiding this comment.
Vec::remove(0) is O(n) on every event after the buffer fills -- thousands of shifts per prediction for token-by-token LLM output. VecDeque::pop_front() is O(1).
| prediction_id.clone(), | ||
| input.clone(), | ||
| webhook_sender, | ||
| response_mode != PredictionResponseMode::AsyncJson, |
There was a problem hiding this comment.
response_mode != PredictionResponseMode::AsyncJson is true for SyncJson too. Should be response_mode == PredictionResponseMode::AsyncSse -- the current expression is inert for sync mode but reads as a bug.
|
|
||
| impl Prediction { | ||
| pub fn new(id: String, webhook: Option<WebhookSender>) -> Self { | ||
| let (stream_tx, _) = tokio::sync::broadcast::channel(1024); |
There was a problem hiding this comment.
Channel capacity and history cap are both 1024 but defined independently. Use a shared constant so they can't drift.
| return; | ||
| } | ||
|
|
||
| if self.service.stream_receiver_count(&self.id) == 0 |
There was a problem hiding this comment.
The async cleanup task calls remove_prediction while the SSE client may still be draining. After removal, stream_receiver_count returns unwrap_or(0) and prediction_is_terminal returns unwrap_or(true) -- which happen to be the safe defaults. That invariant needs a comment, or hold an Arc to the entry in the guard so it doesn't depend on post-removal lookup.
| cog serve --upload-url http://unused/ | ||
|
|
||
| curl -H Accept:text/event-stream PUT /predictions/sse-stream-test '{"id":"sse-stream-test","input":{}}' | ||
| stdout 'event: output' |
There was a problem hiding this comment.
Missing stdout 'event: start' -- should verify the full lifecycle.
Summary
POST /predictionsandPUT /predictions/{id}for Server-Sent Events by returning an SSE stream when requests sendAccept: text/event-stream.Prefer: respond-asyncwithout SSE still returns202JSON.CLI behavior
cog predict --streamoption or otherwise change the predict CLI.